Web Streams
You can work with modern Web APIs like Web Streams using Piscina. Web Streams enable efficient processing of streaming data across multiple threads. It's particularly useful for scenarios involving large datasets or real-time data processing where the benefits of multi-threading can be significant.
In the main script (index.mjs
), we create a Piscina instance and set up a ReadableStream
that generates a sequence of numbers. We also create a WritableStream
that simply logs each chunk it receives. The script then runs a task in the worker pool, passing both streams as arguments.
The worker script (worker.mjs
) defines an async function that reads from the provided ReadableStream
and writes to the WritableStream
. It uses a for-await loop to iterate over the chunks in the ReadableStream
, writing each chunk to the WritableStream
.
- Javascript
- Typescript
export default async function ({ readable, writable }) {
const writer = writable.getWriter();
for await (const chunk of readable) {
await writer.write(chunk);
}
writer.close();
}
import Piscina from 'piscina';
import {
ReadableStream,
WritableStream
} from 'node:stream/web';
const pool = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});
const readable = new ReadableStream({
start () {
this.chunks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
},
pull (controller) {
const chunk = this.chunks.shift();
controller.enqueue(chunk);
if (this.chunks.length === 0) {
controller.close();
}
}
});
const writable = new WritableStream({
write (chunk) {
console.log(chunk);
}
});
(async function () {
await pool.run({ readable, writable }, { transferList: [readable, writable] });
})()
import { ReadableStream, WritableStream } from 'node:stream/web';
export default async function ({ readable, writable }: { readable: ReadableStream, writable: WritableStream }): Promise<void> {
const writer = writable.getWriter();
for await (const chunk of readable) {
await writer.write(chunk);
}
writer.close();
}
import { resolve } from 'path';
import Piscina from 'piscina';
import {
ReadableStream,
WritableStream
} from 'node:stream/web';
import { filename } from './worker';
const pool = new Piscina({
filename: resolve(__dirname, 'workerWrapper.js'),
workerData: { fullpath: filename },
});
const readable = new ReadableStream({
start () {
this.chunks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
},
pull (controller) {
const chunk = this.chunks.shift();
controller.enqueue(chunk);
if (this.chunks.length === 0) {
controller.close();
}
}
});
const writable = new WritableStream({
write (chunk) {
console.log(chunk);
}
});
(async function () {
await pool.run({ readable, writable }, { transferList: [readable, writable] });
})()
const { workerData } = require('worker_threads');
if (workerData.fullpath.endsWith(".ts")) {
require("ts-node").register();
}
module.exports = require(workerData.fullpath);
A key aspect of this example is the use of the transferList
option when running the task. This allows the ReadableStream
and WritableStream
instances to be transferred to the worker thread, rather than cloned. This is crucial for maintaining the integrity of the streams across threads.
You can also check out this example on github.